Let us set some global options for all code chunks in this document.
knitr::opts_chunk$set(
message = FALSE, # Disable messages printed by R code chunks
warning = FALSE, # Disable warnings printed by R code chunks
echo = TRUE, # Show R code within code chunks in output
include = TRUE, # Include both R code and its results in output
eval = TRUE, # Evaluate R code chunks
cache = FALSE, # Enable caching of R code chunks for faster rendering
fig.align = "center",
out.width = "100%",
retina = 2,
error = TRUE,
collapse = TRUE
)
rm(list = ls())
set.seed(1982){python}
import os
import time
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchdiffeq import odeint
import matplotlib.pyplot as plt
import matplotlib
matplotlib.rcParams['figure.dpi'] = 400
# Set your desired parameters here
method = 'dopri5'
data_size = 1000
batch_time = 10
batch_size = 20
niters = 2000 #2000
test_freq = 100#20
viz = True
gpu = 0
adjoint = False
device = torch.device('cuda:' + str(gpu) if torch.cuda.is_available() else 'cpu')
true_y0 = torch.tensor([[2., 0.]]).to(device)
t = torch.linspace(0., 25., data_size).to(device)
true_A = torch.tensor([[-0.1, 2.0], [-2.0, -0.1]]).to(device)
class Lambda(nn.Module):
def forward(self, t, y):
return torch.mm(y**3, true_A)
with torch.no_grad():
true_y = odeint(Lambda(), true_y0, t, method='dopri5')
def get_batch():
s = torch.from_numpy(np.random.choice(np.arange(data_size - batch_time, dtype=np.int64), batch_size, replace=False))
batch_y0 = true_y[s] # (M, D)
batch_t = t[:batch_time] # (T)
batch_y = torch.stack([true_y[s + i] for i in range(batch_time)], dim=0) # (T, M, D)
return batch_y0.to(device), batch_t.to(device), batch_y.to(device)
def visualize(true_y, pred_y, odefunc, ii, itr):
if viz:
# Create the directory if it doesn't exist
if not os.path.exists('png'):
os.makedirs('png')
fig, (ax_traj, ax_phase) = plt.subplots(1, 2, figsize=(10, 5))
ax_traj.set_title(f"Trajectories")
ax_traj.set_xlabel('$t$')
ax_traj.set_ylabel('$x(t),y(t)$')
ax_traj.plot(t.cpu().numpy(), true_y.cpu().numpy()[:, 0, 0], t.cpu().numpy(), true_y.cpu().numpy()[:, 0, 1], 'g-')
ax_traj.plot(t.cpu().numpy(), pred_y.cpu().numpy()[:, 0, 0], '--', t.cpu().numpy(), pred_y.cpu().numpy()[:, 0, 1], 'b--')
ax_traj.set_xlim(t.cpu().min(), t.cpu().max())
ax_traj.set_ylim(-2, 2)
ax_traj.legend(['True x', 'True y', 'Predicted x', 'Predicted y'])
ax_phase.set_title('Iter {:04d} | Total Loss {:.6f}'.format(itr, loss.item()))
ax_phase.set_xlabel('$x(t)$')
ax_phase.set_ylabel('$y(t)$')
ax_phase.plot(true_y.cpu().numpy()[:, 0, 0], true_y.cpu().numpy()[:, 0, 1], 'k-')
ax_phase.plot(pred_y.cpu().numpy()[:, 0, 0], pred_y.cpu().numpy()[:, 0, 1], 'r--')
ax_phase.set_xlim(-2, 2)
ax_phase.set_ylim(-2, 2)
ax_phase.legend(['True', 'Predicted'])
plt.tight_layout()
plt.savefig('png/{:03d}.png'.format(ii))
plt.show()
class ODEFunc(nn.Module):
def __init__(self):
super(ODEFunc, self).__init__()
self.net = nn.Sequential(
nn.Linear(2, 50),
nn.Tanh(),
nn.Linear(50, 2),
)
for m in self.net.modules():
if isinstance(m, nn.Linear):
nn.init.normal_(m.weight, mean=0, std=0.1)
nn.init.constant_(m.bias, val=0)
def forward(self, t, y):
return self.net(y**3)
class RunningAverageMeter(object):
"""Computes and stores the average and current value"""
def __init__(self, momentum=0.99):
self.momentum = momentum
self.reset()
def reset(self):
self.val = None
self.avg = 0
def update(self, val):
if self.val is None:
self.avg = val
else:
self.avg = self.avg * self.momentum + val * (1 - self.momentum)
self.val = val
ii = 0
func = ODEFunc().to(device)
optimizer = optim.RMSprop(func.parameters(), lr=1e-3)
end = time.time()
time_meter = RunningAverageMeter(0.97)
loss_meter = RunningAverageMeter(0.97)
for itr in range(1, niters + 1):
optimizer.zero_grad()
batch_y0, batch_t, batch_y = get_batch()
pred_y = odeint(func, batch_y0, batch_t, method = 'euler').to(device)
loss = torch.mean(torch.abs(pred_y - batch_y))
loss.backward()
optimizer.step()
time_meter.update(time.time() - end)
loss_meter.update(loss.item())
if itr % test_freq == 0:
with torch.no_grad():
pred_y = odeint(func, true_y0, t, method = 'euler')
loss = torch.mean(torch.abs(pred_y - true_y))
print('Iter {:04d} | Total Loss {:.6f}'.format(itr, loss.item()))
visualize(true_y, pred_y, func, ii, itr)
ii += 1
end = time.time()
{python}
import os
import imageio
def create_gif_from_folder(folder_path, output_path, duration=1):
images = []
# Read images from the folder
for filename in sorted(os.listdir(folder_path)):
if filename.endswith('.png'):
images.append(imageio.imread(os.path.join(folder_path, filename)))
# Create GIF
imageio.mimsave(output_path, images, duration=duration)
# Example usage
folder_path = "~/Desktop/Spring 2024/SFDA/Vignettes/png"
output_path = "~/Desktop/Spring 2024/SFDA/Vignettes/png/output.gif"
folder_path = os.path.expanduser(folder_path)
output_path = os.path.expanduser(output_path)
create_gif_from_folder(folder_path, output_path)
knitr::include_graphics("~/Desktop/Spring 2024/SFDA/Vignettes/png/output.gif")